home *** CD-ROM | disk | FTP | other *** search
- #!/usr/local/bin/python
-
- """
-
- RTSP Proxy v1.2
- ---------------
- Jonathan Hogg <jonathan@onegoodidea.com>
-
- Copyright (c) 1999 One Good Idea Limited <http://www.onegoodidea.com/>
-
- Permission to use, copy, modify, and distribute this software and its
- documentation for any purpose, without fee, and without a written agreement
- is hereby granted, provided that the above copyright notice and this
- paragraph and the following two paragraphs appear in all copies.
-
- IN NO EVENT SHALL ONE GOOD IDEA LIMITED BE LIABLE TO ANY PARTY FOR DIRECT,
- INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST
- PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
- EVEN IF ONE GOOD IDEA LIMITED HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
- DAMAGE.
-
- ONE GOOD IDEA LIMITED SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
- BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
- FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS
- IS" BASIS, AND ONE GOOD IDEA LIMITED HAS NO OBLIGATIONS TO PROVIDE
- MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
-
-
- Usage:
-
- % RTSP_Proxy
-
-
- The proxy listens on port 7070 so that it doesn't need to be run as root
- to operate (although this can be easily changed down the bottom of the
- script). It is a very simple program and can get confused, but in it's
- present state is about as functional as Apple's rtsp_proxy but a lot less
- buggy.
-
- """
-
-
- import cPickle
- import sys
- import string
- import StringIO
- import re
- import time
- from threading import *
-
- from socket import *
- if not globals().has_key('IPPROTO_TCP'):
- IPPROTO_TCP = 6
-
- from select import *
-
- import urlparse
- try:
- if "rtsp" not in urlparse.uses_netloc:
- urlparse.uses_netloc.append("rtsp")
- except:
- pass
-
-
-
- #------------------------------------------------------------------------
-
- class Logger:
-
- def __init__( self, file = sys.stderr ):
- self._lastmsg = ''
- self._first = 1
- self._repeats = 0
- self._file = file
- self._file.write( "[log started]" )
- self._lock = Lock()
-
- def log( self, msg ):
- self._lock.acquire()
- if msg == self._lastmsg:
- if self._repeats == 0:
- self._file.write( ' (.' )
- self._file.write( '.' )
- self._repeats = self._repeats + 1
- else:
- if self._repeats > 0:
- self._file.write( ')' )
- self._file.write( '\n' )
- self._first = 0
- self._file.write( msg )
- self._repeats = 0
- self._file.flush()
- if self._repeats == 75 - len(msg):
- self._lastmsg = ''
- else:
- self._lastmsg = msg
- self._lock.release()
-
-
- logger = Logger()
- debug = logger.log
-
-
- def makeportrange( ports ):
-
- if len(ports) == 1:
- return "%d" % ports[0]
- else:
- return "%d-%d" % (ports[0], ports[-1])
-
-
-
- #------------------------------------------------------------------------
-
- def messageFromConnection(conn):
- buffer = conn.recv( 10240 )
- m = Message(buffer)
- print("\n-----------------------------\n%s----------------------\n" % m.getmessage())
- return m
-
- class Message:
-
- def __init__( self, messageString ):
-
- self._input = StringIO.StringIO(messageString)
- self._buffer = ""
- self.readcommand()
- self.readheaders()
- self.readcontent()
-
-
-
- def readdata( self ):
-
- self._buffer = self._buffer + self._input.read(1024)
-
-
- def getdata( self, length):
-
- while 1:
- if len(self._buffer) >= length:
- data = self._buffer[0:length]
- self._buffer = self._buffer[length:]
- return data
- else:
- self.readdata()
-
-
- def readline( self ):
-
- while 1:
- if self._buffer == "":
- self.readdata()
-
- pos = string.find( self._buffer, "\r\n" )
-
- if pos <> -1:
- line = self._buffer[:pos]
- self._buffer = self._buffer[pos+2:]
- return line
-
- self.readdata()
-
- def readcommand( self ):
-
- line = self.readline()
- bits = string.split( line )
- self._command = bits[0]
- self._arguments = bits[1:]
-
-
- def readheaders( self ):
-
- self._headerdict = {}
- self._headerlist = []
-
- while 1:
- line = self.readline()
- if line == "":
- break
- if line[0] in string.whitespace:
- header[1] = header[1] + string.lstrip(line)
- else:
- (field,value) = string.split( line, ":", 1 )
- header = [field, string.strip(value)]
- self._headerlist.append( header )
- self._headerdict[string.lower(field)] = header
-
-
- def readcontent( self ):
-
- length = self.getheader('content-length')
- if length:
- self._content = self.getdata( int(length) )
- else:
- self._content = ""
-
-
- def getmessage( self ):
-
- msg = self._command + " " + string.join( self._arguments ) + "\r\n"
-
- for header in self._headerlist:
- msg = msg + "%s: %s\r\n" % (header[0], header[1])
-
- msg = msg + "\r\n" + self._content
-
- return msg
-
-
- def getheader( self, field ):
-
- name = string.lower( field )
- if self._headerdict.has_key( name ):
- return self._headerdict[name][1]
- else:
- return None
-
-
- def setheader( self, field, value ):
-
- self._headerdict[string.lower(field)][1] = value
-
-
- def getcommand( self ):
-
- return self._command
-
-
- def setcommand( self, command ):
-
- self._command = command
-
-
- def getargs( self ):
-
- return self._arguments
-
-
- def setargs( self, args ):
- self._arguments = args
-
-
-
- #------------------------------------------------------------------------
-
- class Session( Thread ):
-
- START_PORT = 40000
- _currentport = START_PORT
-
- def __init__( self, conn, addr, _from, _to, archive ):
-
- Thread.__init__( self )
- self._clientconn = conn
- self._clientaddr = addr
- self._from = _from
- self._to = _to
- self._portsMapping = {}
- self._archive = archive
- self.setDaemon( 1 )
-
- def _allocateports( self, howmany ):
-
- start = Session._currentport
- sofar = 0
- socks = []
-
- while sofar < howmany:
-
- sock = socket( AF_INET, SOCK_DGRAM )
- port = Session._currentport
- Session._currentport = Session._currentport + 1
-
- try:
- sock.bind( ('',port) )
- except:
- sofar = 0
- start = self._currentport
- socks = []
-
- socks.append( (port,sock) )
- sofar = sofar + 1
- end = port
-
- debug( " allocated a port range at %d-%d" % (start,end) )
-
- return socks
-
-
- def sendclientmsg( self, msg ):
- debug("---------\nACTUALLY SENDING:\n" + msg.getmessage() + "---------------\n")
- self._clientconn.send( msg.getmessage() )
-
- def getservermsg( self ):
- return messageFromConnection( self._serverconn )
-
- def sendservermsg( self, msg ):
- self._serverconn.send( msg.getmessage() )
-
- def dispatch( self, msg ):
- command = msg.getcommand()
-
- debug( "GOT command: " + msg.getmessage() )
-
- if command == "DESCRIBE":
- self.do_passthrough( msg )
-
- elif command == "SETUP":
- self.do_setup( msg )
-
- elif command == "OPTIONS":
- self.do_passthrough( msg )
-
- elif command == "PLAY":
- self.do_play( msg )
- # we need to start playing back data
-
- else:
- self.sendservermsg( msg )
- response = self.getservermsg()
- self.sendclientmsg( response )
-
-
- def do_options( self, msg ):
-
- if self._client_type[:4] == 'QTS/' and self._server_type == 'QTSS/v66':
- debug( ' translating OPTIONS into a GET_PARAMETER ping for broken QuickTime' )
- msg.setcommand( 'GET_PARAMETER' )
- self.sendservermsg( msg )
-
- msg.setcommand( 'RTSP/1.0' )
- msg.setargs( ['200', 'OK'] )
- self.sendclientmsg( msg )
-
- else:
- self.sendservermsg( msg )
- response = self.getservermsg()
- self.sendclientmsg( response )
-
- def do_passthrough( self, msg ):
- command = msg.getcommand()
- outMsg = self._to[command][0]
- self._to[command] = self._to[command][1:]
- self.sendclientmsg(outMsg)
-
- def do_play(self, msg):
- # TODO -- start forcing out data in a timed way on the SETUP channels
- self._playback = Playback(self._archive, self._clientaddr, self._portsMapping)
- self._playback.start()
- command = msg.getcommand()
- outMsg = self._to[command][0]
- self._to[command] = self._to[command][1:]
- # change session
- outMsg.setheader('session', msg.getheader('session'))
- self.sendclientmsg(outMsg)
-
- def do_setup( self, msg ):
- # TODO -- parse SETUP message
- # grab corresponding archived SETUP message
- # create output port pair
- # modify server_port portion
- # add track, index mapping
-
- client_port = ''
-
- debug( " client requests of proxy:\n %s" % msg.getheader('transport') )
-
- for bit in string.split( msg.getheader('transport'), ";" ):
- bit = string.strip( bit )
-
- if string.find( bit, '=' ) > 0:
- name, value = string.split( bit, '=', 1 )
-
- if name == 'client_port':
- client_port = value
-
- if string.find( client_port, "-" ):
- startport,endport = string.split( client_port, "-" )
- clientports = range( int(startport), int(endport) + 1 )
- else:
- clientports = [ int(client_port) ]
-
- # create a port range
- portRange = self._allocateports(len(clientports))
-
- # recover old SETUP response (should be indexed by track ID)
- command = msg.getcommand()
- print "1->", self._to[command]
- print "2->", self._to[command][0]
- response = self._to[command][0]
- self._to[command] = self._to[command][1:]
- debug("OLD: ->" + str(response.getmessage()))
-
- # dig out track ID
- URI = msg.getargs()[0]
- trackID = 0 # TODO -- fix!!
- m = re.search("trackID=(\d+)", URI)
- if m != None: trackID = int(m.group(1))
- print "track ID = ", trackID
-
- # change session
- response.setheader('session', msg.getheader('session'))
-
- # salt away port mappings
- for index in range(len(clientports)):
- self._portsMapping[(trackID,index)] = (portRange[index][1],clientports[index])
-
- # TODO send hacked response
-
- debug( " server offers to proxy:\n " + response.getheader('transport') )
-
- aPortRange = map(lambda x: x[0], portRange)
- response.setheader( 'transport',
- 'RTP/AVP;unicast;client_port=%s;server_port=%s' % (client_port,
- makeportrange(aPortRange)))
-
- debug( " playback offers to client:\n " + response.getheader('transport') )
-
- self.sendclientmsg( response )
-
-
- def run( self ):
-
- # try:
- while 1:
- msg = messageFromConnection( self._clientconn )
- self.dispatch( msg )
-
- # except:
- debug( "taking down session" )
- self._clientconn.close()
- if self._serverconn:
- self._serverconn.close()
-
- def stop(self):
- self._clientconn.close()
-
-
-
- #------------------------------------------------------------------------
-
- class Listener:
-
-
- def __init__( self, port ):
-
- self._sock = socket( AF_INET, SOCK_STREAM )
- self._sock.bind( ('',port) )
- self._sock.setsockopt( IPPROTO_TCP, SO_REUSEADDR, 1 )
- self._sock.listen( 5 )
-
- def waitforclient( self , _from, to, archive):
-
- conn, addr = self._sock.accept()
- debug( "accepted connection from %s:%d" % addr )
- return Session( conn, addr[0], _from, to, archive )
-
- def stop( self ):
- self._sock.close()
-
-
-
- #------------------------------------------------------------------------
-
- class Playback( Thread ):
-
- def __init__( self, archive, clientaddr, portsByTrackID ):
- # portsByTrackID: hash with keys of the form (trackID, index)
- # and values of form (socket, clientPort)
- Thread.__init__( self )
- archive.seek(0)
- self._archive = archive
- self._clientaddr = clientaddr
- self._portsByTrackID = portsByTrackID
- self.setDaemon( 1 )
-
-
- def doplayback(self):
- time.sleep(1)
- self.startTime = time.time()
- self.archiveStartTime = None
- while 1:
- m = cPickle.load(self._archive)
- # look for a 'data'
- if m[1] != 'data': continue
- track = m[2]
- index = m[3]
- playTime = m[0]
- data = m[4]
- if self.archiveStartTime == None: self.archiveStartTime = playTime
- archiveDifference = playTime - self.archiveStartTime
- currentDifference = time.time() - self.startTime
- (socket, clientPort) = self._portsByTrackID[(track,index)]
- print "data len = %d; clientaddr = %s; client port = %d" % (len(data), str(self._clientaddr), clientPort)
- timeToWait = archiveDifference - currentDifference
- if timeToWait > 0.1:
- time.sleep(timeToWait)
- sys.stdout.write(".")
- sys.stdout.flush()
- bytesWrit = socket.sendto(data, (self._clientaddr, clientPort))
-
- def run( self ):
- debug( " starting RTP playback: " + str(self._portsByTrackID))
- self.doplayback()
-
-
-
- #------------------------------------------------------------------------
-
- def getRTSPMessages(f):
- fromClientMessages = {}
- toClientMessages = {}
- fromPending = []
- try:
- while 1:
- ob = cPickle.load(f)
- if ob[1] == 'messageFromClient':
- m = Message(ob[2])
- command = m.getcommand()
- if not fromClientMessages.has_key(command): fromClientMessages[command] = []
- fromClientMessages[command].append(m)
- fromPending.append(command)
- if ob[1] == 'messageToClient':
- m = Message(ob[2])
- command = fromPending[0]
- fromPending = fromPending[1:]
- if not toClientMessages.has_key(command): toClientMessages[command] = []
- toClientMessages[command].append(m)
- except Exception, e:
- print 'exception ',e
- # show traceback information
- tb = sys.exc_info()[2]
- print("Exception: " + str(sys.exc_info()[0]) + " line " + str(tb))
- print fromClientMessages,toClientMessages
- return fromClientMessages,toClientMessages
-
-
-
- def main( argv ):
- while 1:
- try:
- listener = Listener( 7272 )
- break
- except:
- print "sleeping 5"
- time.sleep(5)
-
-
- archive = open(argv[1], "r")
-
- _from,to = getRTSPMessages(archive)
-
- debug( "waiting for a client" )
-
- try:
- while 1:
- client = listener.waitforclient(_from,to, archive)
- listener.stop()
- client.start()
- while 1: time.sleep(100)
- finally:
- listener.stop()
- print "Stopping client"
- client.stop()
-
-
- if __name__ == "__main__":
- main( sys.argv )
-